{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "# Spark Basics 2"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Chaining"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "We can **chain** transformations and aaction to create a computation **pipeline**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "Suppose we want to compute the sum of the squares\n",
    "$$ \\sum_{i=1}^n x_i^2 $$\n",
    "where the elements $x_i$ are stored in an RDD."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "<SparkContext master=local[4] appName=pyspark-shell>\n"
     ]
    }
   ],
   "source": [
    "#start the SparkContext\n",
    "import findspark\n",
    "findspark.init()\n",
    "\n",
    "from pyspark import SparkContext\n",
    "sc = SparkContext(master=\"local[4]\")\n",
    "print(sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "### Create an RDD"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[0, 1, 2, 3]"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "B=sc.parallelize(range(4))\n",
    "B.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### Sequential syntax for chaining\n",
    "Perform assignment after each computation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "14"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "Squares=B.map(lambda x:x*x)\n",
    "Squares.reduce(lambda x,y:x+y) "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### Cascaded syntax for chaining\n",
    "Combine computations into a single cascaded command"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "14"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "B.map(lambda x:x*x)\\\n",
    "   .reduce(lambda x,y:x+y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### Both syntaxes mean exactly the same thing\n",
    "The only difference:\n",
    "* In the sequential syntax the intermediate RDD has a name `Squares`\n",
    "* In the cascaded syntax the intermediate RDD is *anonymous*\n",
    "\n",
    "The execution is identical!"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "### Sequential execution\n",
    "The standard way that the map and reduce are executed is\n",
    "* perform the map\n",
    "* store the resulting RDD in memory\n",
    "* perform the reduce"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "### Disadvantages of Sequential execution\n",
    "\n",
    "1. Intermediate result (`Squares`) requires memory space.\n",
    "2. Two scans of memory (of `B`, then of `Squares`) - double the cache-misses."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### Pipelined execution\n",
    "Perform the whole computation in a single pass. For each element of **`B`**\n",
    "1. Compute the square\n",
    "2. Enter the square as input to the `reduce` operation."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "### Advantages of Pipelined execution\n",
    "\n",
    "1. Less memory required - intermediate result is not stored.\n",
    "2. Faster - only one pass through the Input RDD."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "### Lazy Evaluation\n",
    "This type of pipelined evaluation is related to **Lazy Evaluation**. The word **Lazy** is used because the first command (computing the square) is not executed immediately. Instead, the execution is delayed as long as possible so that several commands are executed in a single pass.\n",
    "\n",
    "The delayed commands are organized in an **Execution plan**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "source": [
    "For more on Pipelined execution, Lazy evaluation and Execution Plans see [spark programming guide/RDD operations](http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "### An instructive mistake\n",
    "Here is another way to compute the sum of the squares using a single reduce command. Can you figure out how it comes up with this unexpected result?"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "8"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "C=sc.parallelize([1,1,2])\n",
    "C.reduce(lambda x,y: x*x+y*y)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "source": [
    "#### Answer:\n",
    "1. `reduce` first operates on the pair $(1,1)$, replacing it with  $1^2+1^2 = 2$\n",
    "2. `reduce` then operates on the pair $(2,2)$, giving the final result $2^2+2^2=8$   "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## getting information about an RDD\n",
    "RDD's typically have hundreds of thousands of elements. It usually makes no sense to print out the content of a whole RDD. Here are some ways to get manageable amounts of information about an RDD"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "Create an RDD of length **`n`** which is a repetition of the pattern `1,2,3,4`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true,
    "scrolled": true,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [],
   "source": [
    "n=1000000\n",
    "B=sc.parallelize([1,2,3,4]*int(n/4))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1000000"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#find the number of elements in the RDD\n",
    "B.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "first element= 1\n",
      "first 5 elements =  [1, 2, 3, 4, 1]\n"
     ]
    }
   ],
   "source": [
    "# get the first few elements of an RDD\n",
    "print('first element=',B.first())\n",
    "print('first 5 elements = ',B.take(5))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "### Sampling an RDD\n",
    "* RDDs are often very large.\n",
    "* Aggregates, such as averages, can be approximated efficiently by using a sample.\n",
    "* Sampling is done in parallel and requires limited computation."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "source": [
    "The method `RDD.sample(withReplacement,p)` generates a sample of the elements of the RDD. where\n",
    "- `withReplacement` is a boolean flag indicating whether or not a an element in the RDD can be sampled more than once.\n",
    "- `p` is the probability of accepting each element into the sample. Note that as the sampling is performed independently in each partition, the number of elements in the sample changes from sample to sample."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {
    "collapsed": false,
    "scrolled": true,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "sample1= [4, 4, 4]\n",
      "sample2= [2, 2]\n"
     ]
    }
   ],
   "source": [
    "# get a sample whose expected size is m\n",
    "# Note that the size of the sample is different in different runs\n",
    "m=5.\n",
    "print('sample1=',B.sample(False,m/n).collect()) \n",
    "print('sample2=',B.sample(False,m/n).collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "skip"
    }
   },
   "source": [
    "### Things to note and think about\n",
    "* Each time you run the previous cell, you get a different estimate\n",
    "* The accuracy of the estimate is determined by the size of the sample $n*p$\n",
    "* See how the error changes as you vary $p$\n",
    "* Can you give a formula that relates the variance of the estimate to $(p*n)$ ? (The answer is in the Probability and statistics course)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "### filtering an RDD\n",
    "The method `RDD.filter(func)` Return a new dataset formed by selecting those elements of the source on which func returns true.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "the number of elements in B that are > 3 = 250000\n"
     ]
    }
   ],
   "source": [
    "print('the number of elements in B that are > 3 =',B.filter(lambda n: n > 3).count())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "### Removing duplicate elements from an RDD\n",
    "The method `RDD.distinct()` Returns a new dataset that contains the distinct elements of the source dataset.\n",
    "\n",
    "This operation requires a **shuffle** in order to detect duplication across partitions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "DuplicateRDD= [1, 1, 2, 2, 3, 3]\n",
      "DistinctRDD =  [1, 2, 3]\n"
     ]
    }
   ],
   "source": [
    "# Remove duplicate element in DuplicateRDD, we get distinct RDD\n",
    "DuplicateRDD = sc.parallelize([1,1,2,2,3,3])\n",
    "print('DuplicateRDD=',DuplicateRDD.collect())\n",
    "print('DistinctRDD = ',DuplicateRDD.distinct().collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "### flatmap an RDD\n",
    "The method `RDD.flatMap(func)` is similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "map: [['you', 'are', 'my', 'sunshine'], ['my', 'only', 'sunshine']]\n",
      "flatmap: ['you', 'are', 'my', 'sunshine', 'my', 'only', 'sunshine']\n"
     ]
    }
   ],
   "source": [
    "text=[\"you are my sunshine\",\"my only sunshine\"]\n",
    "text_file = sc.parallelize(text)\n",
    "# map each line in text to a list of words\n",
    "print('map:',text_file.map(lambda line: line.split(\" \")).collect())\n",
    "# create a single list of words by combining the words from all of the lines\n",
    "print('flatmap:',text_file.flatMap(lambda line: line.split(\" \")).collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "### Set operations\n",
    "In this part, we explore set operations including **union**,**intersection**,**subtract**, **cartesian** in pyspark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": true,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [],
   "source": [
    "rdd1 = sc.parallelize([1, 1, 2, 3])\n",
    "rdd2 = sc.parallelize([1, 3, 4, 5])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "1. union(other)\n",
    " * Return the union of this RDD and another one.\n",
    " * Note that that repetitions are allowed. The RDDs are **bags** not **sets**\n",
    " * To make the result a set, use `.distinct`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "rdd1= [1, 1, 2, 3]\n",
      "rdd2= ['a', 'b', 1]\n",
      "union as bags = [1, 1, 2, 3, 'a', 'b', 1]\n",
      "union as sets = [1, 'a', 2, 3, 'b']\n"
     ]
    }
   ],
   "source": [
    "rdd2=sc.parallelize(['a','b',1])\n",
    "print('rdd1=',rdd1.collect())\n",
    "print('rdd2=',rdd2.collect())\n",
    "print('union as bags =',rdd1.union(rdd2).collect())\n",
    "print('union as sets =',rdd1.union(rdd2).distinct().collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "2. intersection(other)\n",
    " * Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.Note that this method performs a shuffle internally."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "rdd1= [1, 1, 2, 3]\n",
      "rdd2= [1, 1, 2, 5]\n",
      "intersection= [1, 2]\n"
     ]
    }
   ],
   "source": [
    "rdd2=sc.parallelize([1,1,2,5])\n",
    "print('rdd1=',rdd1.collect())\n",
    "print('rdd2=',rdd2.collect())\n",
    "print('intersection=',rdd1.intersection(rdd2).collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "3. subtract(other, numPartitions=None)\n",
    " * Return each value in self that is not contained in other."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "rdd1= [1, 1, 2, 3]\n",
      "rdd2= [1, 1, 2, 5]\n",
      "rdd1.subtract(rdd2)= [3]\n"
     ]
    }
   ],
   "source": [
    "print('rdd1=',rdd1.collect())\n",
    "print('rdd2=',rdd2.collect())\n",
    "print('rdd1.subtract(rdd2)=',rdd1.subtract(rdd2).collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "subslide"
    }
   },
   "source": [
    "4. cartesian(other)\n",
    " * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where **a** is in **self** and **b** is in **other**."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": false,
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "rdd1= [1, 1, 2, 3]\n",
      "rdd2= ['a', 'b']\n",
      "rdd1.cartesian(rdd2)=\n",
      " [(1, 'a'), (1, 'b'), (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b'), (3, 'a'), (3, 'b')]\n"
     ]
    }
   ],
   "source": [
    "rdd2=sc.parallelize([1,1,2])\n",
    "rdd2=sc.parallelize(['a','b'])\n",
    "print('rdd1=',rdd1.collect())\n",
    "print('rdd2=',rdd2.collect())\n",
    "print('rdd1.cartesian(rdd2)=\\n',rdd1.cartesian(rdd2).collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true,
    "slideshow": {
     "slide_type": "slide"
    }
   },
   "source": [
    "## Summary\n",
    "* Chaining: creating a pipeline of RDD operations.\n",
    "* counting, taking and sampling an RDD\n",
    "* More Transformations: `filter, distinct, flatmap`\n",
    "* Set transformations: `union, intersection, subtract, cartesian`"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "slideshow": {
     "slide_type": "fragment"
    }
   },
   "source": [
    "* See you next time!"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "celltoolbar": "Slideshow",
  "hide_input": false,
  "kernelspec": {
   "display_name": "Python [conda root]",
   "language": "python",
   "name": "conda-root-py"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": "block",
   "toc_window_display": false
  },
  "varInspector": {
   "cols": {
    "lenName": 16,
    "lenType": 16,
    "lenVar": 40
   },
   "kernels_config": {
    "python": {
     "delete_cmd_postfix": "",
     "delete_cmd_prefix": "del ",
     "library": "var_list.py",
     "varRefreshCmd": "print(var_dic_list())"
    },
    "r": {
     "delete_cmd_postfix": ") ",
     "delete_cmd_prefix": "rm(",
     "library": "var_list.r",
     "varRefreshCmd": "cat(var_dic_list()) "
    }
   },
   "types_to_exclude": [
    "module",
    "function",
    "builtin_function_or_method",
    "instance",
    "_Feature"
   ],
   "window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
